-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split msgs in iwant response if bigger than limit #944
Conversation
Codecov Report
@@ Coverage Diff @@
## unstable #944 +/- ##
============================================
+ Coverage 83.07% 83.10% +0.03%
============================================
Files 91 91
Lines 15290 15297 +7
============================================
+ Hits 12702 12713 +11
+ Misses 2588 2584 -4
|
3e5a638
to
8983767
Compare
# Check if the encoded message size exceeds the maxMessageSize | ||
if encoded.len > p.maxMessageSize: | ||
var controlSent = false | ||
# Split the RPCMsg into individual messages and send them separately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like it should be a responsibility of the caller - otherwise, we should change the send
function tot take a list of message
instead of a full RPCMsg
- also, a single message might also be bigger than the limit so the utility of this change seems limited compared to the complexity it brings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I only did it here to use the encoded message and not add the overhead of calculating the message size.
If individual messages bigger than the size limit are a common case, we should revisit the message limit. This change allows us to send messages below the size limit that can be sent individually but not as a group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we can split the RPCMsg
before calling send
. But then we need to create a seq
to pass it. Wdyt @arnetheduck ?
@@ -272,7 +272,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} = | |||
|
|||
await conn.close() # This will clean up the send connection | |||
|
|||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = | |||
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool): seq[RPCMsg] {.raises: [].} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time, sendEncoded is used directly here:
https://github.com/status-im/nim-libp2p/blob/c6aa085e98e7526cb8d4415cb9a7f886e6dcab30/libp2p/protocols/pubsub/pubsub.nim#L199-L202
So this fix won't be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the code related to an iwant response uses send
https://github.com/status-im/nim-libp2p/blob/0cdfb0fedcb5f19199c3f3607dd6ff67740fb937/libp2p/protocols/pubsub/gossipsub.nim#L298-L300.
But I can also move this solution to sendEncoded
, as it is called by send. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, no preference here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw in the current implementation, I split the msgs after p.sendObservers(mm)
and sendMetrics(mm)
. Do those procs require splitting too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say no
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But to do that we need to create an RPCMsg, add Messages, and calculate its size.
the size can be pre-computed - this would be the other way to write the above loop (this is what most protobuf encoders do btw - they precompute size then allocate memory in a single shot).
the overhead of rpcmsg
itself is static so it can easily be accounted for - this would be the other way to implement this, ie precompute sizes and split based on them and the rpcmsg overhead itself.
Efficiency is indeed key in gossipsub because of its multiplicative nature - any overhead quickly explodes as message counts go up - at this point, we should be mindful of it and indeed, one of the best things we could do in terms of overall efficiency of gossipsub (and therefore nimbus) would be to improve the efficiency of rpcmsg encoding and decoding in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so how about this?
1 - We create a len
proc for RPCMsg
and all other types used within it that don't have one already.
2 - We use those procs to estimate the serialized size of a RPCMsg and split it based on that. Let's say len(serializedRPCMsg) = 1.1 * len(rpcMsg) - guessing a 10% Protobuf overhead.
3 - We send each RPCMsg individually.
A pseud-code below:
Function splitRPCMsg(rpcMsg: RPCMsg, maxSize: int) -> List[RPCMsg]:
Initialize newRPCMsgs as an empty list of RPCMsg objects
Initialize currentRPCMsg as a new RPCMsg object with the same non-message fields. We sent them only in the first RPCMsg, the next ones have only Messages.
Initialize currentSize = len(currentRPCMsg) # This includes the size of control and other fields
For each msg in rpcMsg.messages:
msgSize = len(msg)
# Check if adding the next message will exceed maxSize
If (currentSize + msgSize) * 1.1 > maxSize:
Add currentRPCMsg to newRPCMsgs
Initialize currentRPCMsg as a new, empty RPCMsg object
currentSize = 0
# Add the message to the current RPCMsg
Add msg to currentRPCMsg.messages
currentSize += msgSize
# Check if there is a non-empty currentRPCMsg left to be added
If currentSize > 0:
Add currentRPCMsg to newRPCMsgs
Return newRPCMsgs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be done deterministically, there's code in nim-protobuf-serialization showing how - also, unless there's need for it, better to return the encoded messages (or even better, turn this into an iterator
which yields the encoded messages so that there is no complex seq
involved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly we don't have to be super precise, even if we just split once we reach limit / 2
without counting pb overhead, each message would still be >500kb with probably <50 bytes overhead (or 0.01% overhead, that's assuming 1mb max message size)
That's why I also proposed originally to split in the handleIWant directly (or somewhere like that). Even just sending each reply in a separate message wouldn't have crazy overhead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use nim-protobuf-serialization in libp2p. Tried importing it and using computeObjectSize
, but got errors about lack of pragma annotations and so on.
2f79610
to
5441441
Compare
@@ -116,3 +116,36 @@ func shortLog*(m: RPCMsg): auto = | |||
messages: mapIt(m.messages, it.shortLog), | |||
control: m.control.get(ControlMessage()).shortLog | |||
) | |||
|
|||
proc len(peerInfo: PeerInfoMsg): int = | |||
return peerInfo.peerId.len + peerInfo.signedPeerRecord.len |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return peerInfo.peerId.len + peerInfo.signedPeerRecord.len | |
peerInfo.peerId.len + peerInfo.signedPeerRecord.len |
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = | ||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) | ||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = | ||
var currentRPCMsg = RPCMsg(subscriptions: rpcMsg.subscriptions, control: rpcMsg.control, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this copy needs to be avoided when no splitting is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe when no split is needed the iterator isn't called:
if encoded.len > p.maxMessageSize:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
else:
# If the message size is within limits, send it as is
debug "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
type | ||
TestGossipSub* = ref object of GossipSub | ||
|
||
proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer = | ||
proc getConn(): Future[Connection] = | ||
p.switch.dial(peerId, GossipSubCodec) | ||
|
||
let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024) | ||
debug "created new pubsub peer", peerId | ||
|
||
p.peers[peerId] = pubSubPeer | ||
|
||
onNewPeer(p, pubSubPeer) | ||
pubSubPeer | ||
|
||
proc randomPeerId*(): PeerId = | ||
try: | ||
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet() | ||
except CatchableError as exc: | ||
raise newException(Defect, exc.msg) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
If you remove it, you can also make onNewPeer private again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This already existed, I just moved it to this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it cause I tried to use getPubSubPeer
in my test, but it isn't necessary anymore. I tried to revert it, but I got multiple errors as testgossipinternal calls onNewPeer
and it works only because gossipsub file was included there. I can no longer do that as I import utils in testgossipinternal now and it causes an error. Yeah, I know, messy.
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} = | ||
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) | ||
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] = | ||
var currentRPCMsg = RPCMsg(subscriptions: rpcMsg.subscriptions, control: rpcMsg.control, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code (& the len part) seems very prone to failure, if we add a field to RPCMsg, update a field, etc
We should at the very least add a comment to the RPCMsg declaration to warn about that, and ideally find a better solution
It seems that the best solution would be to use nim-protobuf-ser, but we can't yet unfortunately
As an alternative, wouldn't it be simpler to modify encodeRpcMsg
? It could take a maxSize
, and return a seq[seq[byte]]
, or even just concatenate the PBs directly internally (would require some tweaks to the pubsubpeer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding the comment is a good and easy-to-do improvement. But not sure about splitting in encodeRpcMsg
, it seems too much abstraction leakage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment
fc213e2
to
3769169
Compare
3769169
to
ed568e9
Compare
092a3b0
to
dddd83f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job!
closes #887